[HUDI-5023] Cleaning up QueueBasedExecutor impls#7238
Conversation
|
Ack Will finish my review this week. |
6241a82 to
eb19e94
Compare
| * Start all producers at once. | ||
| */ | ||
| @Override | ||
| public CompletableFuture<Void> startProducers() { |
There was a problem hiding this comment.
This method (w/ modifications) have been moved into base class to be shared across impls
| // Consumer | ||
| protected final Option<HoodieConsumer<O, E>> consumer; | ||
|
|
||
| public BaseHoodieQueueBasedExecutor(List<HoodieProducer<I>> producers, |
There was a problem hiding this comment.
Ideas underpinning this class were
- Abstract common logic across impls
- Make sure all concurrency is handled exclusively in the base class (impl should just write sync code of how to produce/consume to/from the queue)
- Make sure all resource lifecycle handling happens in the base class
| * HoodieExecutorBase holds common elements producerExecutorService, consumerExecutorService, producers and a single consumer. | ||
| * Also HoodieExecutorBase control the lifecycle of producerExecutorService and consumerExecutorService. | ||
| */ | ||
| public abstract class HoodieExecutorBase<I, O, E> implements HoodieExecutor<I, O, E> { |
There was a problem hiding this comment.
This class has been replaced by BaseHoodieQueueBasedExecutor
| /** | ||
| * IteratorBasedHoodieMessageQueue implements HoodieMessageQueue with Iterable | ||
| */ | ||
| public abstract class HoodieIterableMessageQueue<I, O> implements HoodieMessageQueue<I, O>, Iterable<O> { |
There was a problem hiding this comment.
This is not needed anymore
| /** | ||
| * Consume entries from queue and execute callback function. | ||
| */ | ||
| public abstract class IteratorBasedQueueConsumer<I, O> implements HoodieConsumer<I, O> { |
There was a problem hiding this comment.
This is not needed anymore
| Schema logSchema, | ||
| Configuration hadoopConf, | ||
| org.apache.flink.configuration.Configuration flinkConf) { | ||
| HoodieUnMergedLogRecordScanner.Builder scannerBuilder = HoodieUnMergedLogRecordScanner.newBuilder() |
There was a problem hiding this comment.
This refactoring is necessary to avoid exposing internals of the executor (queue), instead relying on the provided interfaces to configure it
|
@hudi-bot run azure |
eb19e94 to
e17b127
Compare
zhangyue19921010
left a comment
There was a problem hiding this comment.
Nice Work!
Just left a few minor comments
| /** | ||
| * HoodieExecutorBase holds common elements producerExecutorService, consumerExecutorService, producers and a single consumer. | ||
| * Also HoodieExecutorBase control the lifecycle of producerExecutorService and consumerExecutorService. | ||
| */ | ||
| public abstract class BaseHoodieQueueBasedExecutor<I, O, E> implements HoodieExecutor<I, O, E> { | ||
|
|
There was a problem hiding this comment.
nit: HoodieExecutorBase => BaseHoodieQueueBasedExecutor
| public final CompletableFuture<Void> startProducingAsync() { | ||
| return allOf(producers.stream() | ||
| .map(producer -> CompletableFuture.supplyAsync(() -> { | ||
| doProduce(queue, producer); | ||
| return (Void) null; | ||
| }, producerExecutorService)) | ||
| .collect(Collectors.toList()) | ||
| ) | ||
| .thenApply(ignored -> (Void) null) | ||
| .whenComplete((result, throwable) -> { | ||
| // Regardless of how producing has completed, we have to close producers | ||
| // to make sure resources are properly cleaned up | ||
| producers.forEach(HoodieProducer::close); | ||
| // Mark production as done so that consumer will be able to exit | ||
| queue.seal(); | ||
| }); | ||
| } |
There was a problem hiding this comment.
How about using
public final CompletableFuture<Void> startProducingAsync() {
return CompletableFuture.allOf(producers.stream().map(producer -> {
return CompletableFuture.supplyAsync(() -> {
doProduce(queue, producer);
return (Void) null;
}, producerExecutorService);
}).toArray(CompletableFuture[]::new)).whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
// Regardless of how producing has completed, we have to close producers
// to make sure resources are properly cleaned up
producers.forEach(HoodieProducer::close);
// Mark production as done so that consumer will be able to exit
queue.seal();
}
});
}
may be no need to do .thenApply(ignored -> (Void) null)
There was a problem hiding this comment.
In #7245, i'm actually going to replace this w/ an allOf instance having slightly different semantic (allowing to cascade cancellations upon first encountered failure)
| return producing.thenCombine(consuming, (aVoid, anotherVoid) -> null) | ||
| .whenComplete((ignored, throwable) -> { | ||
| // Close the queue to release the resources | ||
| queue.close(); | ||
| }) | ||
| .thenApply(ignored -> consumer.get().finish()) | ||
| // Block until producing and consuming both finish | ||
| .join(); | ||
| } catch (Exception e) { | ||
| throw new HoodieException(e); | ||
| } |
There was a problem hiding this comment.
It seems that TestBoundedInMemoryExecutorInSpark#testInterruptExecutor is blocked here which case ut timeout.
Maybe we can revisit the logic here about how to handle interrupt exception.
There was a problem hiding this comment.
Yeah, join actually doesn't throw InterruptedException. Addressed
| // Consumer | ||
| protected final Option<HoodieConsumer<O, E>> consumer; | ||
|
|
||
| public BaseHoodieQueueBasedExecutor(List<HoodieProducer<I>> producers, |
| @Override | ||
| public void shutdownNow() { | ||
| producerExecutorService.shutdownNow(); | ||
| consumerExecutorService.shutdownNow(); |
There was a problem hiding this comment.
Maybe we need to close current queue into this shutdownNow func.
There was a problem hiding this comment.
We actually close the queue w/in the execute method even in the presence of exceptions (which will cascade from shutdownNow if invoked). Idea here is that shutdownNow should only be used as last-ditch effort of (quickly!) shutting down spun up threads only
| @@ -31,11 +32,18 @@ | |||
|
|
|||
| public class QueueBasedExecutorFactory { | |||
There was a problem hiding this comment.
Maybe we can also rename QueueBasedExecutorFactory to ExecutorFactory because we will support simpleExecutor here which has no inner Queue
There was a problem hiding this comment.
Agreed. Let's take this up in the PR introducing SimpleExecutor
5e9080b to
3532ca4
Compare
QueueBasedExecutor implsQueueBasedExecutor impls
0c62553 to
a76c39f
Compare
|
@hudi-bot run azure |
codope
left a comment
There was a problem hiding this comment.
@alexeykudinkin @zhangyue19921010 Do you plan to re-run the same benchmark as in #5416 for this PR as well?
| @Override | ||
| public float getProgress() throws IOException { | ||
| return Math.min(parquetReader.getProgress(), logRecordScanner.getProgress()); | ||
| // TODO fix to reflect scanner progress |
There was a problem hiding this comment.
Why would we need that for "unmerged" record reader?
There was a problem hiding this comment.
I think this is just tracking progress in terms of how many records were processed from the base-file vs logs
There was a problem hiding this comment.
Synced up offline. This will be addressed in followup PR.
| }) | ||
| .build(); | ||
| // Scan all the delta-log files, filling in the queue | ||
| scanner.scan(); |
There was a problem hiding this comment.
maybe assign scanner progress here, which can be used in L157.
Yeap, I run this benchmark several times based on this PR and the result are the same as before. Thanks for reminding! |
There was a problem hiding this comment.
LTGM! Thanks for this GREAT work!
After UTs green maybe we can land it. Maybe we can rebase master #7346 and have another try.
Cleaned up `HoodieIterableMessageQueue`
…tind down the queue prematurely)
Fixed handling of the case when there's no consumer provide closing the queue prematurely
…e of DQ returning prematurely
a76c39f to
e4652c0
Compare
| @Override | ||
| public float getProgress() throws IOException { | ||
| return Math.min(parquetReader.getProgress(), logRecordScanner.getProgress()); | ||
| // TODO fix to reflect scanner progress |
There was a problem hiding this comment.
Synced up offline. This will be addressed in followup PR.
- Further cleaning up some of the historically inherited artifacts - Replacing remaining usages of the BIMQ, w/ QueueBasedExecutorFactory - Adding missing deps to bundles - Lifted up all concurrency management to `BaseHoodieQueueBasedExuecutor` - Fixed handling of the case when there's no consumer provide closing the queue prematurely
Change Logs
This is a follow-up PR for #5416 that is
QueueBasedExecutorFactoryThis change is a precursor for both #7245 and #7174
Impact
No impact
Risk level (write none, low medium or high below)
Low
Documentation Update
N/A
Contributor's checklist